fix(waterdata): gate the chunked fan-out with a semaphore, not the connection pool#322
Draft
thodson-usgs wants to merge 1 commit into
Draft
fix(waterdata): gate the chunked fan-out with a semaphore, not the connection pool#322thodson-usgs wants to merge 1 commit into
thodson-usgs wants to merge 1 commit into
Conversation
1c676b8 to
c713659
Compare
ChunkedCall._run dispatched every pending sub-request into one asyncio.gather and relied on the shared httpx.AsyncClient connection pool as the only concurrency throttle (max_connections sized from API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, from HTTPX_DEFAULTS): a sub-request that can't get a connection waits in httpx's pool queue, and that wait is bounded by the pool-acquire timeout. So whenever every pooled connection stays busy past that window with none freeing — a batch of large, slowly-streaming pages is enough — the still-queued tail of the fan-out times out with httpx.PoolTimeout. Being a TransportError it burns the per-sub-request retry budget and ultimately surfaces as a bogus *resumable* ServiceInterrupted, telling the user to wait for an upstream that never saw the request. Gate each fetch attempt with an asyncio.Semaphore sized from API_USGS_CONCURRENT instead; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool, so no transport clock runs while they wait and the pool timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt, so a sub-request sleeping off a retry backoff doesn't hold one. "unbounded" degenerates to a semaphore sized at the plan total, so there is a single gated code path. Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics. Tests: - in-flight high-water-mark probe (parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code. - real-localhost-server end-to-end test — mock transports bypass the pool, so this drives the chunker's shared client against a slow server past a scaled-down pool timeout; reproduces the spurious resumable ServiceInterrupted on the pre-fix code and completes on this branch. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
c713659 to
53a4c7e
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
ChunkedCall._rundispatches every pending sub-request into oneasyncio.gatherand relies on the sharedhttpx.AsyncClient's connection pool as the only concurrency throttle (max_connections = API_USGS_CONCURRENT). That collides with the client's pool-acquire timeout (60 s, fromHTTPX_DEFAULTS):httpx.PoolTimeout.TransportError, thatPoolTimeoutburns the per-sub-request retry budget and ultimately surfaces as a bogus resumableServiceInterrupted, telling the user to wait for an upstream that never saw the request.Fix
Gate each fetch attempt with an
asyncio.Semaphoresized fromAPI_USGS_CONCURRENT; the connection pool is now merely sized to match so in-flight sub-requests reuse keepalive connections. Parked sub-requests wait on the semaphore before they touch the pool — no transport clock runs while they wait, so the pool-acquire timeout reverts to its protective role (a genuinely wedged checkout). The slot is acquired per attempt (inside the retry driver), so a sub-request sleeping off a retry backoff doesn't hold one.unboundeddegenerates to a semaphore sized at the plan total, so there is a single gated code path.Observable behavior is otherwise unchanged: same plan, same sub-request order, same resume semantics.
Why not just disable the pool timeout?
Setting
pool=Noneon the client would suppress the spurious failure but lose the stuck-checkout protection and leave dispatch breadth-first in one FIFO. The semaphore removes the failure mode, keeps the timeout meaningful, and bounds in-flight work explicitly.Tests
test_fan_out_in_flight_high_water_mark_is_the_cap(parametrized capped/unbounded) — the fetch-level concurrency equals the cap, not the plan total; the capped case fails on the pre-fix code.test_fan_out_outlives_pool_timeout_on_real_transport— mock transports bypass the pool, so this drives the chunker's shared client against a slow localhost server past a scaled-down pool timeout; it reproduces the spurious resumableServiceInterruptedon the pre-fix code and completes on this branch.Verified end-to-end against the live USGS API: with the pre-fix code 6/8 runs interrupted (in-flight peak 8, pool the only throttle); with the fix 0/8 (in-flight peak 2, semaphore gating). Full suite green,
ruffandmypy --strict dataretrieval/clean.🤖 Generated with Claude Code